home *** CD-ROM | disk | FTP | other *** search
- #!/usr/local/bin/python
- # pcgifile.py - pcgi info file sanity testing - jeffbauer@bigfoot.com
- # Copyright(c) 1998, Jeff Bauer
-
- # 0.6a August 9, 1998
- # - added socket checking
- #
- # 0.5a August 7, 1998
- # - added NT compatibility
- # - improved import checking
- #
- # 0.4a July 27, 1998
- # - added checks for executable permissions
- # - print versions of relevant modules
-
- __version__ = "0.6a"
- Delimiter = '='
-
- # no class-based exceptions due to 1.4 compatbility
- PcgiFileException='PcgiFileException'
-
- class PcgiFile:
- def __init__(self, pcgifile):
- self.pcgifile = pcgifile
- self.infodict = {}
- self.combined = {}
- self.log = []
- self.resource = []
- self.file_contents = []
- self.module_version = []
- self.continue_testing = 1
- self.pcgi_wrapper = None
- try:
- self.readInfoFile()
- self.checkRequiredKeys()
- self.checkPCGIValues()
- self.lookupPCGIPublisher()
- self.checkWritePermissions()
- self.checkImports()
- self.checkSockets()
- except PcgiFileException:
- self.continue_testing = 0
-
- def checkImports(self):
- try:
- from cgi_module_publisher import publish_module
- except ImportError:
- self.log.append("error attempting: 'from cgi_module_publisher import publish_module'")
- raise PcgiFileException
-
- try:
- import cgi_module_publisher, CGIResponse
- self.module_version.append("%-20s %-7s %s" % \
- ('cgi_module_publisher',
- cgi_module_publisher.__version__,
- cgi_module_publisher.__file__) )
- self.module_version.append("%-20s %-7s %s" % \
- ('CGIResponse',
- CGIResponse.__version__,
- CGIResponse.__file__) )
- self.module_version.append("%-20s %-7s %s" %
- ('pcgifile',
- __version__,
- sys.argv[0]) )
- self.module_version.append("%-20s %-7s %s" % \
- ('pcgi-wrapper',
- self.getPcgiWrapperVersion(),
- self.pcgi_wrapper))
- except ImportError:
- pass
- except NameError:
- pass
-
- try:
- import pcgi_publisher
- except ImportError:
- if self.combined.has_key('PCGI_PUBLISHER'):
- d, s = os.path.split(self.combined['PCGI_PUBLISHER'])
- if not d in sys.path:
- sys.path.append(d)
- try:
- import pcgi_publisher
- self.module_version.append("%-20s %-7s %s" % \
- ('pcgi_publisher',
- pcgi_publisher.__version__,
- pcgi_publisher.__file__))
- except ImportError:
- pass
- except NameError:
- pass
-
- def checkPCGIValues(self):
- if self.combined.has_key('PCGI_EXE'):
- sw_exe = self.combined['PCGI_EXE']
- else:
- sw_exe = sys.executable
- self.log.append("advisory recommendation: specify PCGI_EXE=%s" % \
- sys.executable)
- if os.path.isfile(sw_exe):
- self.resource.append("Executable:\t%s" % sw_exe)
- else:
- self.log.append("executable not found: %s" % sw_exe)
- raise PcgiFileException
- if self.combined.has_key('PCGI_PID_FILE'):
- f = self.combined['PCGI_PID_FILE']
- d = os.path.split(f)[0]
- if os.path.isdir(d):
- self.resource.append("PID file:\t%s" % f)
- else:
- self.log.append("directory not found: %s" % d)
- raise PcgiFileException
- if self.combined.has_key('PCGI_SOCKET_FILE'):
- f = self.combined['PCGI_SOCKET_FILE']
- d = os.path.split(f)[0]
- if os.path.isdir(d):
- self.resource.append("Socket file:\t%s" % f)
- else:
- self.log.append("directory not found: %s" % d)
- raise PcgiFileException
- if not self.combined.has_key('PCGI_NAME'):
- self.log.append("advisory recommendation: specify PCGI_NAME")
- if self.combined.has_key('PCGI_MODULE_PATH'):
- p = self.combined['PCGI_MODULE_PATH']
- if os.path.isfile(p):
- self.resource.append("Module:\t%s" % p)
- else:
- self.log.append("module not found: %s" % p)
- raise PcgiFileException
- if self.combined.has_key('PCGI_ERROR_LOG'):
- self.resource.append("Error Log:\t%s" % \
- self.combined['PCGI_ERROR_LOG'])
- if self.combined.has_key('PCGI_WORKING_DIR'): # deprecated
- d = self.combined['PCGI_WORKING_DIR']
- if os.path.isfile(d):
- self.resource.append("Working Directory:\t%s" % d)
- else:
- self.log.append("working directory not found: %s" % d)
- raise PcgiFileException
-
- def checkRequiredKeys(self):
- """
- Check for the required PCGI keys.
- """
- for (k,v) in os.environ.items():
- self.combined[k] = v
- for (k,v) in self.infodict.items():
- if self.combined.has_key(k):
- self.log.append("%s=%s, overwrites: %s" % (k, v, self.combined[k]))
- self.combined[k] = v
- for k in ['PCGI_PID_FILE','PCGI_SOCKET_FILE','PCGI_MODULE_PATH']:
- if not self.combined.has_key(k):
- self.log.append("missing parameter: %s" % k)
- raise PcgiFileException
- # PCGI_INFO_FILE is assigned by the pcgi-wrapper, so that it
- # may be known (made available) to pcgi_publisher.
- self.combined['PCGI_INFO_FILE'] = self.pcgifile
-
- def checkSockets(self):
- """
- Check for possible socket-related error conditions.
- """
- try:
- import socket
- except ImportError:
- self.log.append("unable to import socket module")
- raise PcgiFileException
-
- port = None
- if self.combined.has_key('PCGI_PORT'):
- try:
- port = string.atoi(self.combined['PCGI_PORT'])
- except ValueError:
- self.log.append("invalid port '%s', PCGI_PORT must be an integer" % self.combined['PCGI_PORT'])
- raise PcgiFileException
- if os.name == 'posix':
- if port:
- self.log.append("cannot specify PCGI_PORT directive on Unix - no support for INET sockets")
- raise PcgiFileException
- elif not port:
- self.log.append("win32 platform must specify missing PCGI_PORT directive (default=7244)");
- raise PcgiFileException
-
- if port:
- if self.combined.has_key('PCGI_HOST'):
- hostname = self.combined['PCGI_HOST']
- if hostname != socket.gethostname():
- self.log.append("advisory recommendation: PCGI_HOST '%s' doesn't match '%s'" % (hostname, socket.gethostname()))
- else:
- hostname = socket.gethostname()
-
- if port:
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind((hostname, port))
- except socket.error:
- self.log.append("error creating/binding INET socket (%s, %s)" % (hostname, port))
- raise PcgiFileException
- else:
- try:
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- except socket.error:
- self.log.append("error creating UNIX socket")
- raise PcgiFileException
-
- socketFile = self.combined.get('PCGI_SOCKET_FILE')
- if os.path.exists(socketFile):
- self.log.append("advisory: socket %s in use, bind() not tested" % socketFile)
- else:
- try:
- sock.bind(socketFile)
- os.unlink(socketFile)
- except socket.error:
- self.log.append("error binding UNIX socket (%s)" % socketFile)
- raise PcgiFileException
-
- def checkWritePermissions(self):
- """
- Check write permissions for PCGI_SOCKET_FILE, PCGI_PID_FILE, and
- (if specified) PCGI_ERROR_LOG.
- """
- fn = {}
- if self.combined.has_key('PCGI_PID_FILE'):
- fn['PCGI_PID_FILE'] = self.combined['PCGI_PID_FILE']
- if self.combined.has_key('PCGI_SOCKET_FILE'):
- fn['PCGI_SOCKET_FILE'] = self.combined['PCGI_SOCKET_FILE']
- if self.combined.has_key('PCGI_ERROR_LOG'):
- fn['PCGI_ERROR_LOG'] = self.combined['PCGI_ERROR_LOG']
- for key, file in fn.items():
- if os.path.exists(file):
- try:
- f = open(file,'a+')
- f.close()
- except IOError:
- self.log.append("%s write permission error: %s" % \
- (key, file))
- raise PcgiFileException
- else:
- path = os.path.split(file)[0]
- import tempfile
- tempfile.tempdir = path
- tmpfile = tempfile.mktemp()
- try:
- f = open(tmpfile,'w+')
- f.close()
- os.unlink(tmpfile)
- except IOError:
- self.log.append("%s write permission error: %s" % \
- (key, file))
- raise PcgiFileException
-
- def environList(self):
- """
- return a sorted list of how the environment would likely appear
- if run through the pcgi-wrapper.
- """
- e = []
- keys = self.combined.keys()
- keys.sort()
- for k in keys:
- e.append("%s\t%s" % (k, self.combined[k]))
- return e
-
- def getPcgiWrapperVersion(self):
- """
- Execute pcgi-wrapper with no arguments and grab the version id.
- """
- try:
- import tempfile
- tmpfile = tempfile.mktemp()
- os.system("%s > %s" % (self.pcgi_wrapper, tmpfile))
- f = open(tmpfile, 'r')
- r = f.readlines()
- f.close()
- os.unlink(tmpfile)
- for l in r:
- s = string.strip(l)
- if s[:21] == 'pcgi-wrapper-version ':
- return string.split(s)[1]
- except ImportError:
- pass
- return None
-
- def isexecutable(self, path, real=None):
- if os.name == 'posix':
- return self.pathperm(path, real)[2]
- else:
- return 1
-
- def lookupPCGIPublisher(self):
- """
- The most efficient way for pcgi-wrapper to determine which
- pcgi_publisher to use is for the pcgi info file to specify
- it with the PCGI_PUBLISHER directive. Using the PCGI_PUBLISHER
- is arguably the *best* method, as pcgi-wrapper will find it
- quicker than otherwise. Still, in the interest of flexibility,
- pcgi-wrapper will attempt to locate pcgi_publisher using the
- following rules:
-
- 1. PCGI_PUBLISHER (*best*)
-
- Rules 2-5, look in the paths below for files named: pcgi_publisher.py,
- pcgi_publisher.pyc, pcgi_publisher.pyo, pcgi_publisher.
- 2. PCGI_INSERT_PATH, if available
- 3. PYTHONPATH, if available
- 4. Look in the directory of PCGI_MODULE_PATH
- 5. Look in the directory of the pcgi info file
- """
- if self.combined.has_key('PCGI_PUBLISHER'):
- p = self.combined['PCGI_PUBLISHER']
- if os.path.isfile(p):
- self.resource.append("Publisher:\t%s" % p)
- else:
- self.log.append("publisher not found: %s" % p)
- raise PcgiFileException
- return
- self.log.append("advisory recommendation: specify PCGI_PUBLISHER")
- # search through combined PCGI_INSERT_PATH + PYTHONPATH directories
- searchPath = ""
- if self.combined.has_key('PCGI_INSERT_PATH'):
- searchPath = searchPath + self.combined['PCGI_INSERT_PATH']
- if self.combined.has_key('PYTHONPATH'):
- searchPath = searchPath + self.combined['PYTHONPATH']
- publisherName = ['pcgi_publisher.py','pcgi_publisher.pyc','pcgi_publisher.pyo','pcgi_publisher']
- for d in string.split(searchPath, ':'):
- for p in publisherName:
- pcgiPublisher = "%s%s%s" % (d, os.sep, p)
- if os.path.isfile(pcgiPublisher):
- self.resource.append("Publisher:\t%s" % pcgiPublisher)
- return
- # look in module directory
- if self.combined.has_key('PCGI_MODULE_PATH'):
- (d, x) = os.path.split(self.combined['PCGI_MODULE_PATH'])
- for p in publisherName:
- pcgiPublisher = "%s%s%s" % (d, os.sep, p)
- if os.path.isfile(pcgiPublisher):
- self.resource.append("Publisher:\t%s" % pcgiPublisher)
- return
- # look in pcgi info file directory
- (d, x) = os.path.split(self.pcgifile)
- for p in publisherName:
- pcgiPublisher = "%s%s%s" % (d, os.sep, p)
- if os.path.isfile(pcgiPublisher):
- self.resource.append("Publisher:\t%s" % pcgiPublisher)
- return
- self.log.append("Unable to locate the pcgi_publisher")
- raise PcgiFileException
-
- def pathperm(self, path, real=None):
- """
- Returns a 3-tuple of booleans indicating whether the process has
- (read, write, execute) permission. A true value for the 'real'
- argument indicates the test should occur for the real id rather
- than the effective id.
- """
- stat = os.stat(path)
- if real is None:
- uid = os.geteuid()
- gid = os.getegid()
- else:
- uid = os.getuid()
- gid = os.getgid()
-
- if uid == stat[4]:
- return (00400 & stat[0], 00200 & stat[0], 00100 & stat[0])
- elif gid == stat[5]:
- return (00040 & stat[0], 00020 & stat[0], 00010 & stat[0])
- else:
- return (00004 & stat[0], 00002 & stat[0], 00001 & stat[0])
-
- def readInfoFile(self):
- max_directives = 12 # arbitrary number defined in pcgi.h
- if not os.path.isfile(self.pcgifile):
- self.log.append("unable to locate file: %s" % self.pcgifile)
- raise PcgiFileException
- elif not self.isexecutable(self.pcgifile):
- self.log.append("info file '%s' not executable" % self.pcgifile)
- raise PcgiFileException
- f = open(self.pcgifile, 'r')
- lc = 0
- for r in f.readlines():
- lc = lc + 1
- s = string.strip(r)
- self.file_contents.append(s)
- if lc == 1:
- if s[:2] != '#!':
- self.log.append("first line missing header, e.g. #!/usr/local/bin/pcgi-wrapper")
- raise PcgiFileException
- else:
- self.pcgi_wrapper = string.strip(s[2:])
- if not os.path.isfile(self.pcgi_wrapper):
- self.log.append("unable to find wrapper: %s" % \
- self.pcgi_wrapper)
- raise PcgiFileException
- elif not self.isexecutable(self.pcgi_wrapper):
- self.log.append("wrapper '%s' is not executable" % \
- self.pcgi_wrapper)
- raise PcgiFileException
-
- if len(s) < 1 or s[0] == '#':
- continue
- pos = string.find(s, Delimiter)
- if pos < 0:
- self.log.append("missing '%s' delimiter at line %d: %s" % \
- (Delimiter, lc, s))
- else:
- self.infodict[string.strip(s[0:pos])] = string.strip(s[pos+1:])
- f.close()
- if len(self.infodict.keys()) > max_directives:
- self.log.append("info fileexceeds maximum (%d) number of directives" % max_directives)
- raise PcgiFileException
-
- class PcgiFileTest:
- """
- CGI sanity check of the pcgi info file.
- """
- def __init__(self):
- fs = cgi.FieldStorage()
- infofile = None
- if fs.has_key('infofile'):
- infofile = fs['infofile'].value
- elif fs.has_key('filename'):
- infofile = fs['filename'].value
-
- if infofile is None:
- print "Please specify the pcgi info file in the following manner:"
- print "<pre>"
- print " http://.../cgi-bin/pcgifile.py?<STRONG>infofile=</STRONG><I>pcgifile</I>"
- print "</pre>"
- print "where <I>pcgifile</I> is the absolute path of the pcgi info file."
- else:
- print "<pre>"
- print "<strong>Python %s</strong>" % sys.version
- if os.environ.has_key('SERVER_SOFTWARE'):
- print "<strong>%s</strong>" % os.environ['SERVER_SOFTWARE']
- print
- print "PCGI info file:\t%s" % infofile
- pcgiFile = PcgiFile(infofile)
- print "PCGI wrapper:\t%s" % pcgiFile.pcgi_wrapper
- for m in pcgiFile.log:
- print m
- if not pcgiFile.continue_testing:
- print "status: FAILURE"
- print
- print "<STRONG>%s</STRONG>" % infofile
- for r in pcgiFile.file_contents:
- print " %s" % r
- else:
- print "looks OK"
- print
- print "<STRONG>%s</STRONG>" % infofile
- for r in pcgiFile.file_contents:
- print " %s" % r
- print
- print "<STRONG>Likely publisher resource values:</STRONG>"
- for r in pcgiFile.resource:
- print " %s" % r
- print
- print "<STRONG>Versions of modules used:</STRONG>"
- for r in pcgiFile.module_version:
- print " %s" % r
- print
- print "<STRONG>Resulting environment will probably appear to the publisher as:</STRONG>"
- for e in pcgiFile.environList():
- print " %s" % e
-
- def test():
- usage = 'usage: pcgifile pcgi_info_file'
- if len(sys.argv) < 2:
- print usage
- sys.exit(1)
- infoFile = sys.argv[1]
- pcgiFile = PcgiFile(infoFile)
- for m in pcgiFile.log:
- print m
- if pcgiFile.continue_testing:
- print "%s looks OK" % infoFile
-
- if __name__ == '__main__':
- try:
- import cgi, os, sys, string, traceback
- if os.environ.has_key('SERVER_PROTOCOL'):
- print "Content-type: text/html"
- print "Expires: Monday, 1-Jan-96 00:00:00 GMT"
- print "Pragma: no-cache"
- print
- sys.stderr = sys.stdout
- try:
- pcgiFileTest = PcgiFileTest()
- except:
- print "<pre>"
- traceback.print_exc()
- else:
- test()
- except ImportError:
- print "Content-type: text/html"
- print
- print "error during python imports; to fix try adding to pcgifile.py: <br><pre>"
- print " sys.path[0:0] = [path1, path2, ...]"
-